agentmux_srv\backend\blockcontroller/
persistent.rs

1// Copyright 2026, AgentMux Corp.
2// SPDX-License-Identifier: Apache-2.0
3
4//! PersistentSubprocessController: manages agent CLI as a long-running process
5//! with bidirectional NDJSON streaming via stdin/stdout.
6//!
7//! Architecture:
8//!   A single CLI process is spawned on first message and kept alive for the
9//!   entire session. User messages are written as NDJSON lines to stdin without
10//!   closing it. This enables mid-turn input (redirecting the agent while it
11//!   is still processing).
12//!
13//! State machine:
14//!   INIT ─(first message)─> RUNNING ─(idle between turns)─> RUNNING
15//!   RUNNING ─(kill/stop)─> DONE
16//!   RUNNING ─(process crash)─> DONE (auto-restart possible via session_id)
17//!
18//! I/O model (3 async tasks per session):
19//! 1. stdin_writer: mpsc channel → process stdin (NDJSON lines)
20//! 2. stdout_reader: process stdout → .jsonl persistence + WPS blockfile events
21//! 3. process_waiter: wait for exit, update status
22
23use std::collections::HashMap;
24use std::sync::{Arc, Mutex};
25
26use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
27use tokio::sync::mpsc;
28
29use super::{
30    BlockControllerRuntimeStatus, BlockInputUnion, Controller, STATUS_DONE, STATUS_INIT,
31    STATUS_RUNNING,
32};
33use super::health::{classify_output_line, HealthMonitor};
34use crate::backend::eventbus::EventBus;
35use crate::backend::storage::filestore::FileStore;
36use crate::backend::storage::wstore::WaveStore;
37use crate::backend::wps;
38
39/// WPS file subject name for persistent subprocess output.
40pub const PERSISTENT_OUTPUT_SUBJECT: &str = "output";
41
42/// Controller type constant.
43pub const BLOCK_CONTROLLER_PERSISTENT: &str = "persistent";
44
45/// Configuration for spawning the persistent process.
46#[derive(Debug, Clone)]
47pub struct PersistentSpawnConfig {
48    pub cli_command: String,
49    pub cli_args: Vec<String>,
50    pub working_dir: String,
51    pub env_vars: HashMap<String, String>,
52    pub session_id_field: String,
53}
54
55/// Inner state protected by mutex.
56struct PersistentInner {
57    proc_status: String,
58    proc_exit_code: i32,
59    status_version: i32,
60    session_id: Option<String>,
61    current_pid: Option<u32>,
62    /// Channel to send messages to the stdin writer task.
63    stdin_tx: Option<mpsc::Sender<String>>,
64    /// Handle to kill the process.
65    kill_tx: Option<tokio::sync::oneshot::Sender<bool>>,
66}
67
68/// PersistentSubprocessController keeps a long-running CLI process alive,
69/// sending user messages as NDJSON lines on stdin.
70pub struct PersistentSubprocessController {
71    #[allow(dead_code)]
72    tab_id: String,
73    block_id: String,
74    inner: Arc<Mutex<PersistentInner>>,
75    broker: Option<Arc<wps::Broker>>,
76    event_bus: Option<Arc<EventBus>>,
77    wstore: Option<Arc<WaveStore>>,
78    /// FileStore for write-through persistence of output lines (Phase 1.3).
79    filestore: Option<Arc<FileStore>>,
80    health_monitor: Arc<HealthMonitor>,
81}
82
83impl PersistentSubprocessController {
84    pub fn new(
85        tab_id: String,
86        block_id: String,
87        broker: Option<Arc<wps::Broker>>,
88        event_bus: Option<Arc<EventBus>>,
89        wstore: Option<Arc<WaveStore>>,
90        filestore: Option<Arc<FileStore>>,
91    ) -> Self {
92        let health_monitor = Arc::new(HealthMonitor::new(
93            block_id.clone(),
94            broker.clone(),
95        ));
96        Self {
97            tab_id,
98            block_id,
99            inner: Arc::new(Mutex::new(PersistentInner {
100                proc_status: STATUS_INIT.to_string(),
101                proc_exit_code: 0,
102                status_version: 0,
103                session_id: None,
104                current_pid: None,
105                stdin_tx: None,
106                kill_tx: None,
107            })),
108            broker,
109            event_bus,
110            wstore,
111            filestore,
112            health_monitor,
113        }
114    }
115
116    fn set_status(inner: &mut PersistentInner, status: &str) {
117        inner.proc_status = status.to_string();
118        inner.status_version += 1;
119    }
120
121    fn get_status_snapshot(&self) -> BlockControllerRuntimeStatus {
122        let inner = self.inner.lock().unwrap();
123        BlockControllerRuntimeStatus {
124            blockid: self.block_id.clone(),
125            version: inner.status_version,
126            shellprocstatus: inner.proc_status.clone(),
127            shellprocconnname: "local".to_string(),
128            shellprocexitcode: inner.proc_exit_code,
129            spawn_ts_ms: None,
130            is_agent_pane: true,
131        }
132    }
133
134    fn publish_status(&self) {
135        if let Some(ref broker) = self.broker {
136            let status = self.get_status_snapshot();
137            super::publish_controller_status(broker, &status);
138        }
139    }
140
141    fn is_running(&self) -> bool {
142        let inner = self.inner.lock().unwrap();
143        inner.stdin_tx.is_some()
144    }
145
146    /// Send a user message to the running CLI process.
147    /// If the process isn't spawned yet, spawns it first.
148    pub fn send_message(&self, message: String, config: PersistentSpawnConfig) -> Result<(), String> {
149        // Spawn process if not running
150        if !self.is_running() {
151            self.spawn_process(config)?;
152        }
153
154        // Format as stream-json user message
155        let json_msg = serde_json::json!({
156            "type": "user",
157            "message": {
158                "role": "user",
159                "content": message
160            }
161        });
162
163        let inner = self.inner.lock().unwrap();
164        let tx = inner.stdin_tx.as_ref()
165            .ok_or("persistent process not running after spawn")?;
166        tx.try_send(json_msg.to_string())
167            .map_err(|e| format!("stdin send failed: {e}"))
168    }
169
170    /// Spawn the persistent CLI process.
171    fn spawn_process(&self, config: PersistentSpawnConfig) -> Result<(), String> {
172        // Build command — use make_cli_cmd to resolve .cmd wrappers to node on Windows
173        let mut cmd = crate::server::cli_handlers::make_cli_cmd(&config.cli_command);
174        cmd.args(&config.cli_args);
175
176        // Working directory
177        if !config.working_dir.is_empty() {
178            let expanded_dir = if config.working_dir.starts_with("~/") || config.working_dir == "~" {
179                if let Some(home) = dirs::home_dir() {
180                    home.join(config.working_dir.trim_start_matches("~/")).to_string_lossy().to_string()
181                } else {
182                    config.working_dir.clone()
183                }
184            } else {
185                config.working_dir.clone()
186            };
187            let dir_path = std::path::Path::new(&expanded_dir);
188            if !dir_path.exists() {
189                if let Err(e) = std::fs::create_dir_all(dir_path) {
190                    tracing::warn!(
191                        block_id = %self.block_id,
192                        dir = %expanded_dir,
193                        error = %e,
194                        "failed to create working directory"
195                    );
196                }
197            }
198            if dir_path.exists() {
199                cmd.current_dir(&expanded_dir);
200
201                // 4.2 follow-up: warn loudly if the agent's working directory
202                // contains a nested .git (typically because the agent, or the
203                // user on its behalf, cloned a repo into its cwd). A 3.5 GB
204                // nested clone was found under ~/.agentmux/agents/agentx/ in
205                // an earlier session and confused agents into reading stale
206                // pre-SolidJS code. We can't prevent the clone (the agent is
207                // an external process) so the best we can do is make it
208                // impossible to miss in the logs, with the exact cleanup
209                // command the user needs to run. Single fs::metadata call —
210                // no directory walk.
211                let looks_like_agent_workspace = expanded_dir.contains("/.agentmux/agents/")
212                    || expanded_dir.contains("\\.agentmux\\agents\\");
213                if looks_like_agent_workspace {
214                    let git_dir = dir_path.join(".git");
215                    if git_dir.exists() {
216                        tracing::warn!(
217                            block_id = %self.block_id,
218                            cwd = %expanded_dir,
219                            ".git detected inside agent workspace — this is \
220                             usually an unintended nested clone and can waste \
221                             gigabytes of disk. Clean up with: rm -rf {}/.git",
222                            expanded_dir
223                        );
224                    }
225                }
226            }
227        }
228
229        // Environment variables (with tilde expansion)
230        for (k, v) in &config.env_vars {
231            let expanded = crate::backend::base::expand_home_dir_safe(v);
232            cmd.env(k, expanded.to_string_lossy().as_ref());
233        }
234
235        cmd.stdin(std::process::Stdio::piped());
236        cmd.stdout(std::process::Stdio::piped());
237        cmd.stderr(std::process::Stdio::piped());
238
239        let mut child = cmd.spawn().map_err(|e| {
240            tracing::error!(block_id = %self.block_id, error = %e, "persistent process spawn failed");
241            format!("failed to spawn persistent process: {e}")
242        })?;
243
244        let pid = child.id().unwrap_or(0);
245
246        // Notify health monitor that a turn is starting. This arms the Stalled
247        // (30 s) and Dead (120 s) thresholds so the frontend learns the agent
248        // is not responding rather than silently waiting forever.
249        self.health_monitor.set_active_turn(true);
250
251        tracing::info!(
252            block_id = %self.block_id,
253            pid = pid,
254            cmd = %config.cli_command,
255            args = ?config.cli_args,
256            working_dir = %config.working_dir,
257            "persistent process spawned"
258        );
259
260        // Assign the persistent CLI to this block's process tracker.
261        // Matches `SubprocessController`'s identical path — both controller
262        // types share the same swarm-pane visibility story.
263        if pid != 0 {
264            if let Some(registry) = crate::backend::process_tracker::registry::global() {
265                let tracker = registry.ensure_tracker(&self.block_id);
266                if let Err(e) = tracker.assign_process(pid) {
267                    tracing::warn!(
268                        block_id = %self.block_id,
269                        pid = pid,
270                        err = %e,
271                        "[process-tracker] assign_process failed"
272                    );
273                }
274            }
275        }
276
277        let (kill_tx, kill_rx) = tokio::sync::oneshot::channel::<bool>();
278        let stdin = child.stdin.take().unwrap();
279        let stdout = child.stdout.take().unwrap();
280        let stderr = child.stderr.take();
281
282        // Drain stderr in background — log lines for debugging
283        if let Some(stderr_pipe) = stderr {
284            let block_id_stderr = self.block_id.clone();
285            tokio::spawn(async move {
286                let mut reader = BufReader::new(stderr_pipe).lines();
287                while let Ok(Some(line)) = reader.next_line().await {
288                    tracing::warn!(
289                        block_id = %block_id_stderr,
290                        line = %line,
291                        "persistent stderr"
292                    );
293                }
294            });
295        }
296
297        // Create stdin writer channel
298        let (msg_tx, mut msg_rx) = mpsc::channel::<String>(32);
299
300        {
301            let mut inner = self.inner.lock().unwrap();
302            inner.current_pid = Some(pid);
303            inner.kill_tx = Some(kill_tx);
304            inner.stdin_tx = Some(msg_tx);
305            Self::set_status(&mut inner, STATUS_RUNNING);
306        }
307        self.publish_status();
308
309        // Record active pid for crash recovery (Phase 4.2). If the server
310        // dies while this subprocess is running, scan_orphans() will find
311        // the stale pid on next boot and flag the session as interrupted.
312        if let Some(ref wstore) = self.wstore {
313            super::session_recovery::mark_active_pid(wstore, &self.block_id, pid);
314        }
315
316        // Spawn stdin writer task
317        tokio::spawn(async move {
318            let mut stdin = stdin;
319            while let Some(msg) = msg_rx.recv().await {
320                if let Err(e) = stdin.write_all(msg.as_bytes()).await {
321                    tracing::warn!("persistent stdin write error: {}", e);
322                    break;
323                }
324                if let Err(e) = stdin.write_all(b"\n").await {
325                    tracing::warn!("persistent stdin newline error: {}", e);
326                    break;
327                }
328                if let Err(e) = stdin.flush().await {
329                    tracing::warn!("persistent stdin flush error: {}", e);
330                    break;
331                }
332            }
333            // Channel closed or write error → stdin drops → process gets EOF
334            drop(stdin);
335        });
336
337        // Spawn stdout reader task
338        let block_id_read = self.block_id.clone();
339        let broker_read = self.broker.clone();
340        let inner_read = Arc::clone(&self.inner);
341        let wstore_read = self.wstore.clone();
342        let event_bus_read = self.event_bus.clone();
343        let filestore_read = self.filestore.clone();
344        let health_read = Arc::clone(&self.health_monitor);
345        let session_id_field = config.session_id_field.clone();
346
347        tokio::spawn(async move {
348            let reader = BufReader::new(stdout);
349            let mut lines = reader.lines();
350            let mut stats = super::session_stats::SessionStatsAccumulator::new(block_id_read.clone());
351
352            while let Ok(Some(line)) = lines.next_line().await {
353                if line.trim().is_empty() {
354                    continue;
355                }
356
357                // Track session metadata (debounced 1 s)
358                stats.record_line(line.len(), &wstore_read);
359
360                // Parse JSON for health monitoring and session ID capture
361                if let Ok(parsed) = serde_json::from_str::<serde_json::Value>(&line) {
362                    let (meaningful, _error) = classify_output_line(&parsed);
363                    health_read.record_output(meaningful);
364                    if let Some(sid) = parsed.get(&session_id_field).and_then(|v| v.as_str()) {
365                        let sid_string = sid.to_string();
366                        let already_captured = inner_read.lock().unwrap().session_id.is_some();
367                        if !already_captured {
368                            tracing::info!(
369                                block_id = %block_id_read,
370                                session_id = %sid_string,
371                                "persistent session ID captured"
372                            );
373                            {
374                                let mut inner = inner_read.lock().unwrap();
375                                inner.session_id = Some(sid_string.clone());
376                            }
377                            // Persist to block metadata (same pattern as subprocess.rs)
378                            if let Some(ref store) = wstore_read {
379                                let oref_str = format!("block:{}", block_id_read);
380                                let mut meta_update =
381                                    crate::backend::obj::MetaMapType::new();
382                                meta_update.insert(
383                                    "agent:sessionid".to_string(),
384                                    serde_json::Value::String(sid_string),
385                                );
386                                if let Err(e) = crate::server::service::update_object_meta(
387                                    store, &oref_str, &meta_update,
388                                ) {
389                                    tracing::warn!(
390                                        block_id = %block_id_read,
391                                        error = %e,
392                                        "failed to persist agent:sessionid"
393                                    );
394                                } else if let Some(ref event_bus) = event_bus_read {
395                                    if let Ok(updated_block) = store.must_get::<crate::backend::obj::Block>(&block_id_read) {
396                                        let update_data = serde_json::to_value(
397                                            &crate::backend::obj::WaveObjUpdate {
398                                                updatetype: "update".into(),
399                                                otype: "block".into(),
400                                                oid: block_id_read.clone(),
401                                                obj: Some(crate::backend::obj::wave_obj_to_value(&updated_block)),
402                                            },
403                                        )
404                                        .ok();
405                                        event_bus.broadcast_event(
406                                            &crate::backend::eventbus::WSEventType {
407                                                eventtype: "waveobj:update".to_string(),
408                                                oref: oref_str,
409                                                data: update_data,
410                                            },
411                                        );
412                                    }
413                                }
414                            }
415                        }
416                    }
417                }
418
419                // Publish line as WPS blockfile event and write-through to FileStore
420                // for persistent history (Phase 1.3).
421                tracing::info!(
422                    block_id = %block_id_read,
423                    line_len = line.len(),
424                    "persistent stdout → blockfile"
425                );
426                let line_with_newline = format!("{}\n", line);
427                if let Some(ref broker) = broker_read {
428                    super::shell::handle_append_block_file(
429                        broker,
430                        &block_id_read,
431                        PERSISTENT_OUTPUT_SUBJECT,
432                        line_with_newline.as_bytes(),
433                        filestore_read.as_ref(),
434                    );
435                } else {
436                    tracing::warn!(block_id = %block_id_read, "persistent stdout: no broker available");
437                }
438            }
439
440            tracing::info!(block_id = %block_id_read, "persistent stdout reader finished");
441        });
442
443        // Spawn health watchdog — checks every 5 s while turn is active.
444        // Emits `agenthealth` WPS events when the process stalls (30 s) or
445        // dies (120 s) without producing meaningful output, giving the
446        // frontend enough signal to show a "not responding" warning.
447        let health_watchdog = Arc::clone(&self.health_monitor);
448        tokio::spawn(async move {
449            let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(5));
450            loop {
451                interval.tick().await;
452                if !health_watchdog.is_active_turn() {
453                    break;
454                }
455                health_watchdog.check();
456            }
457        });
458
459        // Spawn process waiter task
460        let block_id_wait = self.block_id.clone();
461        let inner_wait = Arc::clone(&self.inner);
462        let broker_wait = self.broker.clone();
463        let wstore_wait = self.wstore.clone();
464        let health_wait = Arc::clone(&self.health_monitor);
465
466        tokio::spawn(async move {
467            tokio::select! {
468                status = child.wait() => {
469                    let exit_code = status.map(|s| s.code().unwrap_or(-1)).unwrap_or(-1);
470                    tracing::info!(
471                        block_id = %block_id_wait,
472                        exit_code = exit_code,
473                        "persistent process exited"
474                    );
475
476                    // Notify health monitor so Stalled/Dead watchdog stops.
477                    health_wait.set_exited(exit_code);
478
479                    let mut inner = inner_wait.lock().unwrap();
480                    inner.proc_exit_code = exit_code;
481                    inner.current_pid = None;
482                    inner.stdin_tx = None;
483                    inner.kill_tx = None;
484                    Self::set_status(&mut inner, STATUS_DONE);
485                    drop(inner);
486
487                    // Clear active pid — clean exit, no recovery needed.
488                    if let Some(ref wstore) = wstore_wait {
489                        super::session_recovery::clear_active_pid(wstore, &block_id_wait);
490                    }
491
492                    // Publish status
493                    if let Some(ref broker) = broker_wait {
494                        let status = BlockControllerRuntimeStatus {
495                            blockid: block_id_wait.clone(),
496                            version: 0,
497                            shellprocstatus: STATUS_DONE.to_string(),
498                            shellprocconnname: "local".to_string(),
499                            shellprocexitcode: exit_code,
500                            spawn_ts_ms: None,
501                            is_agent_pane: true,
502                        };
503                        super::publish_controller_status(broker, &status);
504                    }
505                }
506                Ok(force) = kill_rx => {
507                    tracing::info!(
508                        block_id = %block_id_wait,
509                        force = force,
510                        "persistent process kill requested"
511                    );
512                    if force {
513                        let _ = child.kill().await;
514                    } else {
515                        // Graceful: drop stdin to send EOF, then wait briefly
516                        {
517                            let mut inner = inner_wait.lock().unwrap();
518                            inner.stdin_tx = None; // drops the sender → stdin writer exits → stdin closes
519                        }
520                        tokio::select! {
521                            _ = child.wait() => {}
522                            _ = tokio::time::sleep(std::time::Duration::from_secs(5)) => {
523                                let _ = child.kill().await;
524                            }
525                        }
526                    }
527
528                    health_wait.set_exited(-1);
529
530                    let mut inner = inner_wait.lock().unwrap();
531                    inner.proc_exit_code = -1;
532                    inner.current_pid = None;
533                    inner.stdin_tx = None;
534                    inner.kill_tx = None;
535                    Self::set_status(&mut inner, STATUS_DONE);
536                    drop(inner);
537
538                    // Clear active pid — user-initiated stop, no recovery needed.
539                    if let Some(ref wstore) = wstore_wait {
540                        super::session_recovery::clear_active_pid(wstore, &block_id_wait);
541                    }
542                }
543            }
544        });
545
546        Ok(())
547    }
548
549    pub fn stop_process(&self, force: bool) -> Result<(), String> {
550        let kill_tx = {
551            let mut inner = self.inner.lock().unwrap();
552            inner.kill_tx.take()
553        };
554        match kill_tx {
555            Some(tx) => {
556                let _ = tx.send(force);
557                Ok(())
558            }
559            None => Ok(()),
560        }
561    }
562
563    pub fn session_id(&self) -> Option<String> {
564        self.inner.lock().unwrap().session_id.clone()
565    }
566}
567
568impl Controller for PersistentSubprocessController {
569    fn start(
570        &self,
571        _block_meta: super::super::obj::MetaMapType,
572        _rt_opts: Option<serde_json::Value>,
573        _force: bool,
574    ) -> Result<(), String> {
575        tracing::info!(
576            block_id = %self.block_id,
577            "persistent controller registered (spawns on first message)"
578        );
579        Ok(())
580    }
581
582    fn stop(&self, _graceful: bool, new_status: &str) -> Result<(), String> {
583        self.stop_process(true)?;
584        let mut inner = self.inner.lock().unwrap();
585        if inner.proc_status != new_status {
586            Self::set_status(&mut inner, new_status);
587        }
588        Ok(())
589    }
590
591    fn get_runtime_status(&self) -> BlockControllerRuntimeStatus {
592        self.get_status_snapshot()
593    }
594
595    fn send_input(&self, _input: BlockInputUnion, _seq: Option<u64>) -> Result<(), String> {
596        Err("persistent controller does not accept raw input; use send_message()".to_string())
597    }
598
599    fn controller_type(&self) -> &str {
600        BLOCK_CONTROLLER_PERSISTENT
601    }
602
603    fn block_id(&self) -> &str {
604        &self.block_id
605    }
606
607    fn as_any(&self) -> &dyn std::any::Any {
608        self
609    }
610}